package p2p

import (
	"bufio"
	"encoding/json"
	"fmt"
	"osiris/core/txpool"
	"osiris/dto"
	"osiris/logger"
	"sync"

	"github.com/libp2p/go-libp2p/core/network"
	peer "github.com/libp2p/go-libp2p/core/peer"
	"github.com/libp2p/go-libp2p/core/protocol"
)

// TxProtocol 交易广播
type TxProtocol struct {
	MulticastProtocol
}

func (protocol TxProtocol) protocolName() protocol.ID {
	return "/p2p/multicast/tx"
}

func (protocol TxProtocol) openStream(peerID peer.ID, jsonStr string, wg *sync.WaitGroup) {
	defer func() {
		if wg != nil {
			wg.Done()
		}
	}()

	protocol.openMultiStream(protocol.protocolName(), jsonStr)
}

func (protocol TxProtocol) handleStream(s network.Stream) {
	logger.Println("Got a new stream " + protocol.protocolName())
	logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s]", protocol.protocolName()): "Got a new stream "})

	// Create a buffer stream for non blocking read and write.
	rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))
	wg := new(sync.WaitGroup)
	wg.Add(2)
	verifyCh := make(chan bool)
	relayJsonCh := make(chan string)

	//读协程
	go func() {
		defer wg.Done()

		str, err := rw.ReadString('\n')
		if err != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle Stream %s] read string", protocol.protocolName()): err})
			return
		}

		var txDto dto.TxDto
		if err1 := json.Unmarshal([]byte(str), &txDto); err1 != nil {
			verifyCh <- false
			logger.Error(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] TxDto unmarshal", protocol.protocolName()): err1})
			return
		}

		//检查是否为重复txDto
		if txpool.IsHashInBuf(txDto.DtoHash) {
			verifyCh <- false
			logger.Info(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] ", protocol.protocolName()): "duplicate TxDto"})
			return
		}
		//不是重复区块的话，将区块加入缓存
		txpool.AddRecentTxDtoHash(txDto.DtoHash)

		//验证TxDto的Hash、节点签名
		if !txDto.Verify() {
			verifyCh <- false
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] verify txDto", protocol.protocolName()): "fail"})
			return
		}

		//验证TxData的Hash、账户签名(交易由节点发起的话无需再次验证，比如TX_RELEASE)
		if txDto.TxData.TxType != dto.TX_RELEASE && !txDto.TxData.Verify() {
			verifyCh <- false
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] verify txData", protocol.protocolName()): "fail"})
			return
		}

		succeed := txpool.AddTx(&txDto.TxData)
		if !succeed {
			logger.Warn(map[string]interface{}{fmt.Sprintf("[p2p] [Handle stream %s] verify txData", protocol.protocolName()): "tx denied by txpool"})
			verifyCh <- false
		}

		//全部验证通过，同时将交易加入交易池
		verifyCh <- true
		relayJsonCh <- str
	}()

	//将发送方的peer.ID排除在转发目标外
	fromPeerID := s.Conn().RemotePeer()
	protocol.DynamicExcluded[fromPeerID] = true

	//转发协程
	go func() {
		defer wg.Done()

		valid := <-verifyCh
		if !valid {
			return
		}

		jsonStr := <-relayJsonCh
		protocol.openMultiStream(protocol.protocolName(), jsonStr)
	}()

	wg.Wait()
}
